-
-
Notifications
You must be signed in to change notification settings - Fork 10.5k
[1/N] Elastic EP Milestone 2 #26278
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
[1/N] Elastic EP Milestone 2 #26278
Conversation
stateless group elastic EP: support CUDA graph + peer weights transfer update state filter small fix bench script small fix fix intra-node to inter-node scaling remove unused code
👋 Hi! Thank you for contributing to the vLLM project. 💬 Join our developer Slack at https://slack.vllm.ai to discuss your PR in #pr-reviews, coordinate on features in #feat- channels, or join special interest groups in #sig- channels. Just a reminder: PRs would not trigger full CI run by default. Instead, it would only run You ask your reviewers to trigger select CI tests on top of Once the PR is approved and ready to go, your PR reviewer(s) can run CI to test the changes comprehensively before merging. To run CI, PR reviewers can either: Add If you have any questions, please reach out to us on Slack at https://slack.vllm.ai. 🚀 |
This pull request has merge conflicts that must be resolved before it can be |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces significant optimizations for elastic expert parallelism, building upon initial support. The key changes include a new state machine for scaling up/down, peer-to-peer weight transfer for new workers, and progressive reconfiguration to avoid dropping traffic during scaling operations. The introduction of stateless communicators independent of torch.distributed
's global state is a major architectural shift enabling these features. My review has identified a critical bug in the state machine logic and several high-severity issues related to fragile implementation details that could lead to future breakages. Overall, this is a substantial and well-structured contribution, but the identified issues should be addressed to ensure robustness and correctness.
class ScaleDownShutdownWorkerState(enum.IntEnum): | ||
PREPARE = 0 | ||
EPLB_RESHUFFLE = 1 | ||
COMPLETE = 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the ScaleDownShutdownWorkerState
enum, COMPLETE
is assigned the value 1
, which is the same as EPLB_RESHUFFLE
. This is a critical bug. When the state transitions to COMPLETE
, it will be evaluated as EPLB_RESHUFFLE
in _progress_shutdown_worker
, causing the state machine to re-enter the EPLB_RESHUFFLE
block instead of terminating. This will lead to incorrect behavior and prevent the worker from shutting down cleanly. The value for COMPLETE
must be unique within the enum.
class ScaleDownShutdownWorkerState(enum.IntEnum): | |
PREPARE = 0 | |
EPLB_RESHUFFLE = 1 | |
COMPLETE = 1 | |
class ScaleDownShutdownWorkerState(enum.IntEnum): | |
PREPARE = 0 | |
EPLB_RESHUFFLE = 1 | |
COMPLETE = 2 |
def get_next_stateless_world_group_port(self) -> list[int]: | ||
return self._stateless_world_group_port_list.pop(0) | ||
|
||
def get_next_stateless_dp_group_port(self) -> list[int]: | ||
return self._stateless_dp_group_port_list.pop(0) | ||
|
||
def get_next_stateless_ep_group_port(self) -> list[int]: | ||
return self._stateless_ep_group_port_list.pop(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These methods use pop(0)
to retrieve a port from a list without checking if the list is empty. If the port lists (_stateless_world_group_port_list
, _stateless_dp_group_port_list
, _stateless_ep_group_port_list
) are exhausted for any reason, this will raise an IndexError
and crash the process. While the logic in __post_init__
seems to pre-allocate the necessary ports, this design is fragile. A more robust implementation would be to check if the list is empty before popping and raise a more informative error message.
# Check if this is a stateless process group | ||
from torch.distributed.distributed_c10d import _world | ||
is_stateless = _world.pg_map.get(cpu_group, None) is None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The check _world.pg_map.get(cpu_group, None) is None
relies on an internal, undocumented implementation detail of torch.distributed
to determine if a process group is stateless. This is a brittle approach that could break with future PyTorch updates. It would be more robust to use an explicit mechanism to identify stateless groups, such as a custom process group class that carries this information, or passing a flag during initialization.
if op.op.__name__ == "isend": | ||
self.send(op.tensor, op.group_peer, stream) | ||
elif op.op.__name__ == "irecv": | ||
self.recv(op.tensor, op.group_peer, stream) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Checking op.op.__name__
to determine the operation type is fragile. The name of a function can change, or it could be wrapped by a decorator, which would break this logic. It's more robust to check for function identity directly.
if op.op.__name__ == "isend": | |
self.send(op.tensor, op.group_peer, stream) | |
elif op.op.__name__ == "irecv": | |
self.recv(op.tensor, op.group_peer, stream) | |
if op.op is torch.distributed.isend: | |
self.send(op.tensor, op.group_peer, stream) | |
elif op.op is torch.distributed.irecv: | |
self.recv(op.tensor, op.group_peer, stream) |
if ep_group not in _world.pg_map: | ||
ep_group = get_ep_group() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The check if ep_group not in _world.pg_map:
relies on an internal implementation detail of PyTorch's distributed library (_world.pg_map
) to detect stateless process groups. This is not a public API and is subject to change without notice, which makes this code brittle. A more robust approach, such as using a custom process group class or an explicit flag, should be used to differentiate between stateful and stateless groups.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codex Review
Here are some automated review suggestions for this pull request.
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
for tensor in tensor_list: | ||
if tensor.numel() == 0: | ||
continue | ||
if self.device_communicator and tensor.is_cuda: | ||
self.device_communicator.broadcast(tensor, src) | ||
else: | ||
self.tcp_store_group.broadcast(tensor, src) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Broadcast ignores returned tensor for CPU path
In StatelessGroupCoordinator.broadcast_tensor_dict
the non‑CUDA path calls self.tcp_store_group.broadcast(tensor, src)
but discards the returned tensor (lines 197‑203). For StatelessProcessGroup
, broadcast
returns a new tensor and does not mutate the supplied tensor when the caller is not the source rank. Because the result is ignored, every CPU tensor in tensor_list
remains uninitialized on non‑source ranks. Any stateless group without a device communicator (e.g. the world group) or any broadcast of CPU tensors will therefore propagate zeros instead of the actual data, breaking synchronization of metadata and tensors across ranks. The returned tensor needs to be captured or copied into the preallocated tensor.
Useful? React with 👍 / 👎.
Purpose
PR #20775 introduces initial support of elastic expert parallelism. This PR adds further optimizations towards Milestone 2 in #20323. Key features include:
vllm/distributed/elastic_ep/elastic_state.py
andvllm/distributed/elastic_ep/elastic_execute.py
.DPEngineCoreProc
. Inrun_busy_loop
,elastic_scaling_state.progress()
is called to progress reconfiguration by one step if ready. If reconfiguration cannot continue, existing workers continue to serve requests. Such progressive reconfiguration between forward steps also helps to quickly finish in-flight user requests, prevent requests from queuing up and improve SLO attainment.—enable-elastic-ep
), all EP/DP communicators will be replaced byvllm/distributed/stateless_coordinator.py
that is independent oftorch.distributed
’s global state. We can therefore create standby communicators while keeping the current ones, enabling the bootstrap of new workers to overlap with request serving on existing workers. We only need to do a single switch to use the new communicators after we are ready to switch to the new setup.switch_and_prepare()
invllm/distributed/elastic_ep/elastic_execute.py
. We will introduce optimizations on CUDA graphs in follow-up PRs.There are also some minor bug fixes including:
Test Plan
We test the performance before scale up and after scale on using Qwen/Qwen3-30B-A3B-Thinking-2507-FP8. The number of physical experts per GPU is set to 72. We note that the number of local physical experts remain the same during scale up and down, while the total number of redundant experts scales accordingly, which is the same assumption as in PR #20775. We use PPLX kernels (intra-node mode that does not require NVSHMEM) and enable CUDA graphs using default settings.
To scale up we use:
Test Results
We use the following benchmark script.
Serving on 2 GPUs (EP=2, TP=1) before scaling up:
Serving on 4 GPUs (EP=4, TP=1) after scaling up:
Next Steps
CC List
@abmfy @ruisearch42 @simon-mo @tlrmchlsmth @njhill @kouroshHakha